package com.lengxf.flink.demo;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class WordCountBatchDemo {

    public static void main(String[] args) throws Exception {

        //1.创建执行环境
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        //2、读取数据
        DataSource<String> lineDS = executionEnvironment.readTextFile("flink/src/main/resources/word.txt");
        //3.切换、在转换
        FlatMapOperator<String, Tuple2<String, Integer>> stringTuple2FlatMapOperator = lineDS.flatMap(getFlatMapper());
        //4.安装好word分组
        UnsortedGrouping<Tuple2<String, Integer>> tuple2UnsortedGrouping = stringTuple2FlatMapOperator.groupBy(0);
        //5.各分组内聚合
        AggregateOperator<Tuple2<String, Integer>> sum = tuple2UnsortedGrouping.sum(1);
        //6.输出
        sum.print();
    }

    private static FlatMapFunction<String, Tuple2<String, Integer>> getFlatMapper() {
        return new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = s.split(" ");
                for (String word : words) {
                    collector.collect(Tuple2.of(word, 1));
                }
            }
        };
    }

}